library(tidyverse)
## Warning: package 'tidyverse' was built under R version 3.5.3
## -- Attaching packages ----------------------------------------------------------------------------------------------------------------------------------- tidyverse 1.2.1 --
## v ggplot2 3.1.0 v purrr 0.2.5
## v tibble 1.4.2 v dplyr 0.7.8
## v tidyr 0.8.2 v stringr 1.3.1
## v readr 1.3.1 v forcats 0.3.0
## Warning: package 'readr' was built under R version 3.5.2
## Warning: package 'forcats' was built under R version 3.5.2
## -- Conflicts -------------------------------------------------------------------------------------------------------------------------------------- tidyverse_conflicts() --
## x dplyr::filter() masks stats::filter()
## x dplyr::lag() masks stats::lag()
library(DT)
options = params$options
read.clean.files = function(filename){
file = read.csv(filename, header = FALSE)
column.names = c("Language","Randomize","Dataset","MachineID","RunID","Type","Operation","TimeTaken")
colnames(file) = column.names
return(file)
}
files = list.files(path = "../Results/", pattern = ".csv$", recursive = TRUE, full.names = TRUE) # List all .csv files
#files
databricks.files = files[grepl("Databricks",files)]
local.vm..files = files[grepl("Local_VM",files)]
rows.databricks = lapply(databricks.files, read.csv, header = FALSE) # Read the files into list
merged.databricks = do.call(rbind, rows.databricks) # combine the data.frame
merged.databricks$Setup = 'Databricks'
rows.local.vm = lapply(local.vm..files, read.csv, header = FALSE) # Read the files into list
merged.local.vm = do.call(rbind, rows.local.vm) # combine the data.frame
merged.local.vm$Setup = 'Local VM'
merged_data = rbind(merged.databricks,merged.local.vm)
merged_data$Setup = as.factor(merged_data$Setup)
column.names = c("Language","Randomize","Dataset","MachineID","RunID","Type","Operation","TimeTaken","Setup")
colnames(merged_data) = column.names
merged_data$Type = as.factor(gsub(pattern = "Operations", replacement = "Operation", x = merged_data$Type))
merged_data = merged_data %>% filter(RunID != 1)
# Convert columns to factors
merged_data$MachineID = as.factor(merged_data$MachineID)
merged_data$Randomize = as.factor(merged_data$Randomize)
merged_data$RunID = as.factor(merged_data$RunID)
merged_data$Dataset = sub("dataset_", "", merged_data$Dataset)
merged_data$Dataset = sub("MB$", "", merged_data$Dataset)
merged_data$Dataset = as.factor(merged_data$Dataset)
str(merged_data)
## 'data.frame': 5177 obs. of 9 variables:
## $ Language : Factor w/ 2 levels "PySpark","Scala": 1 1 1 1 1 1 1 1 1 1 ...
## $ Randomize: Factor w/ 1 level "1": 1 1 1 1 1 1 1 1 1 1 ...
## $ Dataset : Factor w/ 5 levels "10","100","200",..: 2 2 2 2 2 2 2 2 2 2 ...
## $ MachineID: Factor w/ 2 levels "1","2": 1 1 1 1 1 1 1 1 1 1 ...
## $ RunID : Factor w/ 5 levels "2","3","4","5",..: 1 1 1 1 1 1 1 1 1 1 ...
## $ Type : Factor w/ 4 levels "Aggregate Operation",..: 2 4 3 2 3 2 3 2 2 1 ...
## $ Operation: Factor w/ 37 levels " Filter"," Filter Reg Ex 1",..: 4 21 18 10 19 28 17 24 29 7 ...
## $ TimeTaken: num 15.44 7.04 11.72 16.64 11.28 ...
## $ Setup : Factor w/ 2 levels "Databricks","Local VM": 1 1 1 1 1 1 1 1 1 1 ...
head(merged_data)
## Language Randomize Dataset MachineID RunID Type
## 1 PySpark 1 100 1 2 Column Operation
## 2 PySpark 1 100 1 2 Row Operation
## 3 PySpark 1 100 1 2 Mixed Operation
## 4 PySpark 1 100 1 2 Column Operation
## 5 PySpark 1 100 1 2 Mixed Operation
## 6 PySpark 1 100 1 2 Column Operation
## Operation TimeTaken Setup
## 1 Full Outer Join 10 Columns 15.442877 Databricks
## 2 Running Sum 7.039728 Databricks
## 3 Pivot 10 Rows and 1 Column 11.719025 Databricks
## 4 Inner Join 5 Columns 16.637699 Databricks
## 5 Pivot 5 Rows and 1 Column 11.277677 Databricks
## 6 Sorting Desc 5 column 10.368395 Databricks
summary(merged_data)
## Language Randomize Dataset MachineID RunID
## PySpark:2267 1:5177 10 :1200 1:2762 2:1173
## Scala :2910 100:1290 2:2415 3:1173
## 200: 945 4:1038
## 300: 932 5: 903
## 500: 810 6: 890
##
##
## Type Operation
## Aggregate Operation: 698 GroupBy 10 columns : 175
## Column Operation :2559 Merge 2 columns into 1 : 175
## Mixed Operation : 524 Merge 5 columns into 1 : 175
## Row Operation :1396 Pivot 1 Rows and 1 Column : 175
## Pivot 10 Rows and 1 Column: 175
## Ranking by Group : 175
## (Other) :4127
## TimeTaken Setup
## Min. : 0.237 Databricks:2972
## 1st Qu.: 3.792 Local VM :2205
## Median : 11.450
## Mean : 25.430
## 3rd Qu.: 27.981
## Max. :353.195
##
size_10MB = 11.4789848327637 # file.size("../../Data/Databricks/machine2/dataset_10MB.csv")/(1024*1024)
size_100MB = 115.640992164612 # file.size("../../Data/Databricks/machine2/dataset_100MB.csv")/(1024*1024)
size_200MB = 229.8573
size_300MB = 343.2709
size_500MB = 576.678165435791 # file.size("../../Data/Databricks/machine2/dataset_500MB.csv")/(1024*1024)
print(paste("Actual Size of 10MB file (in MB)",size_10MB))
## [1] "Actual Size of 10MB file (in MB) 11.4789848327637"
print(paste("Actual Size of 100MB file (in MB)",size_100MB))
## [1] "Actual Size of 100MB file (in MB) 115.640992164612"
print(paste("Actual Size of 200MB file (in MB)",size_200MB))
## [1] "Actual Size of 200MB file (in MB) 229.8573"
print(paste("Actual Size of 300MB file (in MB)",size_300MB))
## [1] "Actual Size of 300MB file (in MB) 343.2709"
print(paste("Actual Size of 500MB file (in MB)",size_500MB))
## [1] "Actual Size of 500MB file (in MB) 576.678165435791"
size_info = data.frame(Dataset = c("10","100","200","300","500")
,Size = c(size_10MB,size_100MB,size_200MB,size_300MB,size_500MB))
str(size_info)
## 'data.frame': 5 obs. of 2 variables:
## $ Dataset: Factor w/ 5 levels "10","100","200",..: 1 2 3 4 5
## $ Size : num 11.5 115.6 229.9 343.3 576.7
merged_data = merged_data %>%
merge(size_info) %>%
mutate(Throughput = Size/TimeTaken)
Common Functions
summarize_results = function(grouped_data){
rv = grouped_data %>%
summarise(n = n()
,Mean_Time = round(mean(TimeTaken),2)
,Std_Dev_Time= round(sd(TimeTaken),2)
,Coeff_Var_Time = round(Mean_Time/Std_Dev_Time,2)
,Mean_Throughput = round(mean(Throughput),2)
,Std_Dev_Throughput= round(sd(Throughput),2)
,Coeff_Var_Throughput = round(Mean_Throughput/Std_Dev_Throughput,2)
)
return(rv)
}
plot_hist = function(grouped_data, by_var){
indices = grouped_data %>%
dplyr::group_indices() %>%
as.factor()
grouped_data$Index = as.factor(indices)
facet_form = as.formula(paste( "~" , paste(grouped_data %>% dplyr::group_vars(), collapse = " + "), sep = ""))
print(ggplot(grouped_data, aes_string(x = "Index", y = "TimeTaken", fill=by_var)) +
geom_boxplot() +
#facet_wrap(Index ~ . , scales = 'free',ncol=4, labeller = label_both))
facet_wrap(facet_form , scales = 'free',ncol=4, labeller = label_both))
return(grouped_data)
}
databricks_vs_localVM = function(arData) {
result = arData %>%
group_by(Type, Operation, Language, MachineID, Dataset, Setup) %>%
summarize_results()
group = arData %>%
group_by(Type, Operation, Language, MachineID, Dataset)
plot_hist(grouped_data = group, by_var = "Setup")
return (result)
}
PySpark_vs_Scala = function(arData, arOpt=2) {
# 1 will only return table
# 2 will only plot histograms
# 0 will do both
result = NA
if (arOpt == 1 | arOpt == 0){
result = arData %>%
group_by(Type, Operation, Dataset, MachineID, Setup, Language) %>%
summarize_results()
}
if (arOpt == 2 | arOpt == 0){
group = arData %>%
group_by(Type, Operation, Dataset, MachineID, Setup)
plot_hist(grouped_data = group, by_var = "Language")
}
return (result)
}
PySpark vs. Scala
Row Operations
if (options == 1){
filtered = merged_data %>%
filter(Type == "Row Operation") %>%
PySpark_vs_Scala(arOpt = 2)
}
Column Operations
if (options == 2){
filtered = merged_data %>%
filter(Type == "Column Operation") %>%
PySpark_vs_Scala(arOpt = 2)
}

Aggregare and MiColumn Operations
if (options == 3){
filtered = merged_data %>%
filter(Type == "Aggregate Operation") %>%
PySpark_vs_Scala(arOpt = 2)
filtered = merged_data %>%
filter(Type == "Mixed Operation") %>%
PySpark_vs_Scala(arOpt = 2)
}